SpringEvent事件編程

SpringEvent 事件驱动介绍

Spring中的事件驱动模型也叫作发布订阅模式,是观察者模式的一个典型的应用
Spring事件驱动模型中存在三个角色: 事件原型、事件发布者、事件监听者.

Spring事件原型

Spring事件定义通过ApplicationEvent,该类继承自Jdk的EventObject; JDK要求所有事件将继承它,并通过source得到事件源, 比如我们的AWT事件体系也是继承自它;

ApplicationEvent规范定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public abstract class ApplicationEvent extends EventObject {

/** use serialVersionUID from Spring 1.2 for interoperability */
private static final long serialVersionUID = 7099057708183571937L;

/** System time when the event happened */
private final long timestamp;

/**
* Create a new ApplicationEvent.
* @param source the object on which the event initially occurred (never {@code null})
*/
public ApplicationEvent(Object source) {
super(source);
this.timestamp = System.currentTimeMillis();
}

/**
* Return the system time in milliseconds when the event happened.
*/
public final long getTimestamp() {
return this.timestamp;
}
}

Spring事件发布者

Spring事件发布规范定义在ApplicationEventPublisher中,主要用于事件发布者发布中事件;

ApplicationEventPublisher规范定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public interface ApplicationEventPublisher {

/**
* Notify all <strong>matching</strong> listeners registered with this
* application of an application event. Events may be framework events
* (such as RequestHandledEvent) or application-specific events.
* @param event the event to publish
* @see org.springframework.web.context.support.RequestHandledEvent
*/
void publishEvent(ApplicationEvent event);

/**
* Notify all <strong>matching</strong> listeners registered with this
* application of an event.
* <p>If the specified {@code event} is not an {@link ApplicationEvent},
* it is wrapped in a {@link PayloadApplicationEvent}.
* @param event the event to publish
* @since 4.2
* @see PayloadApplicationEvent
*/
void publishEvent(Object event);
}

ApplicationContext接口就集成了ApplicationEventPublisher,如图:

对于ApplicationContext, Spring提供了默认的实现, 在抽象类AbstractApplicationContext中, 常见的AnnotationConfigWebApplicationContextClassPathXmlApplicationContextFileSystemXmlApplicationContext都有继承AbstractApplicationContext的事件行为:

AbstractApplicationContext类中实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* Publish the given event to all listeners.
* @param event the event to publish (may be an {@link ApplicationEvent}
* or a payload object to be turned into a {@link PayloadApplicationEvent})
* @param eventType the resolved event type, if known
* @since 4.2
*/
protected void publishEvent(Object event, ResolvableType eventType) {
Assert.notNull(event, "Event must not be null");
if (logger.isTraceEnabled()) {
logger.trace("Publishing event in " + getDisplayName() + ": " + event);
}

// Decorate event as an ApplicationEvent if necessary
ApplicationEvent applicationEvent;
if (event instanceof ApplicationEvent) {
applicationEvent = (ApplicationEvent) event;
}
else {
applicationEvent = new PayloadApplicationEvent<Object>(this, event);
if (eventType == null) {
eventType = ((PayloadApplicationEvent)applicationEvent).getResolvableType();
}
}

// Multicast right now if possible - or lazily once the multicaster is initialized
if (this.earlyApplicationEvents != null) {
this.earlyApplicationEvents.add(applicationEvent);
}
else {
getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
}

// Publish event via parent context as well...
if (this.parent != null) {
if (this.parent instanceof AbstractApplicationContext) {
((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
}
else {
this.parent.publishEvent(event);
}
}
}

在这个方法中,我们看到了一getApplicationEventMulticaster();ApplicationEventMulticaster.属于事件广播器,它的作用是把ApplicationContext发布的Event广播给所有的监听器.

Spring事件监听者

Spring中ApplicationListener是事件监控者规范的定义, 定义内容如下:

1
2
3
4
5
6
7
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
/**
* Handle an application event.
* @param event the event to respond to
*/
void onApplicationEvent(E event);
}

ApplicationListener继承自jdk的EventListener,所有的监听器都要实现这个接口,这个接口只有一个onApplicationEvent()方法,该方法接受一个ApplicationEvent或其子类对象作为参数,在方法体中,可以通过不同对Event类的判断来进行相应的处理.当事件触发时所有的监听器都会收到消息,如果你需要对监听器的接收顺序有要求,可以实现该接口的一个子接口SmartApplicationListener,通过这个接口可以指定监听器接收事件的顺序.

SmartApplicationListener的接口集成关系如下:

SpringEvent 事件应用示例

事件定义

1
2
3
4
5
6
public class PlanAllocatePubEvent extends ApplicationEvent {

public PlanAllocatePubEvent(Long allocateId) {
super(allocateId);
}
}

事件发布

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
public class PlanAllocatePubPublisher {

private static final Logger LOG = LoggerFactory.getLogger(PlanAllocatePubPublisher.class);

@Autowired
private ApplicationEventPublisher applicationEventPublisher;

public void publishEvent(long allocateId){
LOG.info("[事件发布] 数据创建, 编号:" + allocateId);
PlanAllocatePubEvent event = new PlanAllocatePubEvent(allocateId);
applicationEventPublisher.publishEvent(event);
}
}

事件监听(无序)

同步事件处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Component
public class SyncPlanAllocatePubListener implements ApplicationListener<PlanAllocatePubEvent>{

private static final Logger LOG = LoggerFactory.getLogger(SyncPlanAllocatePubListener.class);

@Override
public void onApplicationEvent(PlanAllocatePubEvent planAllocatePubEvent) {
LOG.info("[同步][事件监听][开始]数据创建, 数据编号:" + planAllocatePubEvent.getSource());
doSomething();
LOG.info("[同步][事件监听][结束]数据创建, 数据编号:" + planAllocatePubEvent.getSource());
}

public void doSomething(){
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
LOG.error(e.getLocalizedMessage(), e);
}
}

}

异步事件处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Component
@EnableAsync
public class AsynPlanAllocatePubListener implements ApplicationListener<PlanAllocatePubEvent> {

private static final Logger LOG = LoggerFactory.getLogger(AsynPlanAllocatePubListener.class);

@Async
@Override
public void onApplicationEvent(PlanAllocatePubEvent planAllocatePubEvent) {
LOG.info("[异步][事件监听][开始]数据创建, 数据编号:" + planAllocatePubEvent.getSource());
doSomething();
LOG.info("[异步][事件监听][结束]数据创建, 数据编号:" + planAllocatePubEvent.getSource());
}

public void doSomething(){
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
LOG.error(e.getLocalizedMessage(), e);
}
}

}

事件监听(有序)

示例代码: 事件监听逻辑1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Component
public class Order1PlanAllocatePubListener implements SmartApplicationListener {

private static final Logger LOG = LoggerFactory.getLogger(Order1PlanAllocatePubListener.class);

/**
* supportsEventType用于指定支持的事件类型,只有支持的才调用onApplicationEvent
* @param eventType
* @return
*/
@Override
public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
return eventType == PlanAllocatePubEvent.class;
}

/**
* supportsSourceType支持的目标类型,只有支持的才调用onApplicationEvent
* @param sourceType
* @return
*/
@Override
public boolean supportsSourceType(Class<?> sourceType) {
return sourceType == Long.class;
}

@Override
public void onApplicationEvent(ApplicationEvent event) {
LOG.info("[有序事件][事件监听][Order:1]数据创建, 编号:" + event.getSource());
doSomething();
}

/**
* 优先级顺序,越小优先级越高
* @return
*/
@Override
public int getOrder() {
return 1;
}

public void doSomething(){
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
LOG.error(e.getLocalizedMessage(), e);
}
}
}

示例代码: 事件监听逻辑2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Component
public class Order2PlanAllocatePubListener implements SmartApplicationListener{

private static final Logger LOG = LoggerFactory.getLogger(Order2PlanAllocatePubListener.class);

/**
* supportsEventType用于指定支持的事件类型,只有支持的才调用onApplicationEvent
* @param eventType
* @return
*/
@Override
public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
return eventType == PlanAllocatePubEvent.class;
}

/**
* supportsSourceType支持的目标类型,只有支持的才调用onApplicationEvent
* @param sourceType
* @return
*/
@Override
public boolean supportsSourceType(Class<?> sourceType) {
return sourceType == Long.class;
}

@Override
public void onApplicationEvent(ApplicationEvent event) {
LOG.info("[有序事件][事件监听][Order:2]数据创建, 编号:" + event.getSource());
doSomething();
}

/**
* 优先级顺序,越小优先级越高
* @return
*/
@Override
public int getOrder() {
return 2;
}

public void doSomething(){
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
LOG.error(e.getLocalizedMessage(), e);
}
}
}

测试事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

@Controller
@RequestMapping(value = "/api/event")
public class ApiPluginEventController {

@Autowired
private PlanAllocatePubPublisher planAllocatePubPublisher;

@ResponseBody
@GetMapping(value = "/publish")
public Map pubEvent() throws Exception {
for(int i=1; i<5; i++){
planAllocatePubPublisher.publishEvent(i);
}
return ImmutableMap.builder().put("message", "ok").build();
}
}
异步事件执行结果
1
2
3
4
5
6
7
8
9
10
11
12
INFO:2017-09-11 20:14:08.799[publishEvent] [事件发布] 数据创建, 数据编号:1 
INFO:2017-09-11 20:14:08.803[publishEvent] [事件发布] 数据创建, 数据编号:2
INFO:2017-09-11 20:14:08.803[onApplicationEvent] [异步][事件监听][开始]数据创建, 数据编号:1
INFO:2017-09-11 20:14:08.804[publishEvent] [事件发布] 数据创建, 数据编号:3
INFO:2017-09-11 20:14:08.804[onApplicationEvent] [异步][事件监听][开始]数据创建, 数据编号:2
INFO:2017-09-11 20:14:08.805[onApplicationEvent] [异步][事件监听][开始]数据创建, 数据编号:3
INFO:2017-09-11 20:14:08.804[publishEvent] [事件发布] 数据创建, 数据编号4
INFO:2017-09-11 20:14:08.805[onApplicationEvent] [异步][事件监听][开始]数据创建, 数据编号:4
INFO:2017-09-11 20:14:11.807[onApplicationEvent] [异步][事件监听][结束]数据创建, 数据编号:1
INFO:2017-09-11 20:14:11.807[onApplicationEvent] [异步][事件监听][结束]数据创建, 数据编号:2
INFO:2017-09-11 20:14:11.807[onApplicationEvent] [异步][事件监听][结束]数据创建, 数据编号:3
INFO:2017-09-11 20:14:11.809[onApplicationEvent] [异步][事件监听][结束]数据创建, 数据编号:4
同步事件执行结果
1
2
3
4
5
6
7
8
9
10
11
12
INFO:2017-09-11 20:17:07.470[publishEvent] [事件发布] 数据创建, 数据编号:1 
INFO:2017-09-11 20:17:07.473[onApplicationEvent] [同步][事件监听][开始]数据创建, 数据编号:1
INFO:2017-09-11 20:17:09.476[onApplicationEvent] [同步][事件监听][结束]数据创建, 数据编号:1
INFO:2017-09-11 20:17:09.477[publishEvent] [事件发布] 数据创建, 数据编号:2
INFO:2017-09-11 20:17:09.478[onApplicationEvent] [同步][事件监听][开始]数据创建, 数据编号:2
INFO:2017-09-11 20:17:11.481[onApplicationEvent] [同步][事件监听][结束]数据创建, 数据编号:2
INFO:2017-09-11 20:17:11.481[publishEvent] [事件发布] 数据创建, 数据编号:3
INFO:2017-09-11 20:17:11.482[onApplicationEvent] [同步][事件监听][开始]数据创建, 数据编号:3
INFO:2017-09-11 20:17:13.486[onApplicationEvent] [同步][事件监听][结束]数据创建, 数据编号:3
INFO:2017-09-11 20:17:13.487[publishEvent] [事件发布] 数据创建, 数据编号:4
INFO:2017-09-11 20:17:13.487[onApplicationEvent] [同步][事件监听][开始]数据创建, 数据编号:4
INFO:2017-09-11 20:17:15.492[onApplicationEvent] [同步][事件监听][结束]数据创建, 数据编号:4
有序事件执行结果
1
2
3
4
5
6
7
8
9
10
11
12
INFO:2017-09-11 20:33:12.139[publishEvent] [事件发布] 数据创建, 编号:1 
INFO:2017-09-11 20:33:12.140[onApplicationEvent] [有序事件][事件监听][Order:1]数据创建, 编号:1
INFO:2017-09-11 20:33:15.145[onApplicationEvent] [有序事件][事件监听][Order:2]数据创建, 编号:1
INFO:2017-09-11 20:33:18.146[publishEvent] [事件发布] 数据创建, 编号:2
INFO:2017-09-11 20:33:18.147[onApplicationEvent] [有序事件][事件监听][Order:1]数据创建, 编号:2
INFO:2017-09-11 20:33:21.151[onApplicationEvent] [有序事件][事件监听][Order:2]数据创建, 编号:2
INFO:2017-09-11 20:33:24.156[publishEvent] [事件发布] 数据创建, 编号:3
INFO:2017-09-11 20:33:24.157[onApplicationEvent] [有序事件][事件监听][Order:1]数据创建, 编号:3
INFO:2017-09-11 20:33:27.162[onApplicationEvent] [有序事件][事件监听][Order:2]数据创建, 编号:3
INFO:2017-09-11 20:33:30.167[publishEvent] [事件发布] 数据创建, 编号:4
INFO:2017-09-11 20:33:30.168[onApplicationEvent] [有序事件][事件监听][Order:1]数据创建, 编号:4
INFO:2017-09-11 20:33:33.171[onApplicationEvent] [有序事件][事件监听][Order:2]数据创建, 编号:4

注解事件监听

Spring4.2开始提供了@EventListener注解,使得监听器不再需要实现ApplicationListener接口,只需要在监听方法上加上该注解即可,方法不一定叫onApplicationEvent,但有且只能有一个参数,指定监听的事件类型.
上面示例中的有序事件监听逻辑可以使用注解方式实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Component
public class PlanAllocatePubHandler {

private static final Logger LOG = LoggerFactory.getLogger(PlanAllocatePubHandler.class);

/**
* 基于注解的事件监听
* @param planAllocatePubEvent
*/
@EventListener
@Order(1)
public void annoEnvListener1(PlanAllocatePubEvent planAllocatePubEvent) {
LOG.info("[注解事件监听][Order:1]数据创建, 编号:" + planAllocatePubEvent.getSource());
doSomething();
}

/**
* 基于注解的事件监听
* @param planAllocatePubEvent
*/
@EventListener
@Order(2)
public void annoEnvListener2(PlanAllocatePubEvent planAllocatePubEvent) {
LOG.info("[注解事件监听][Order:2]数据创建, 编号:" + planAllocatePubEvent.getSource());
doSomething();
}

public void doSomething(){
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
LOG.error(e.getLocalizedMessage(), e);
}
}
}
注解事件监听执行结果
1
2
3
4
5
6
7
8
9
10
11
12
INFO:2017-09-11 20:57:00.167[publishEvent] [事件发布] 数据创建, 编号:1 
INFO:2017-09-11 20:57:00.168[annoEnvListener1] [注解事件监听][Order:1]数据创建, 编号:1
INFO:2017-09-11 20:57:03.169[annoEnvListener2] [注解事件监听][Order:2]数据创建, 编号:1
INFO:2017-09-11 20:57:06.175[publishEvent] [事件发布] 数据创建, 编号:2
INFO:2017-09-11 20:57:06.176[annoEnvListener1] [注解事件监听][Order:1]数据创建, 编号:2
INFO:2017-09-11 20:57:09.180[annoEnvListener2] [注解事件监听][Order:2]数据创建, 编号:2
INFO:2017-09-11 20:57:12.184[publishEvent] [事件发布] 数据创建, 编号:3
INFO:2017-09-11 20:57:12.185[annoEnvListener1] [注解事件监听][Order:1]数据创建, 编号:3
INFO:2017-09-11 20:57:15.191[annoEnvListener2] [注解事件监听][Order:2]数据创建, 编号:3
INFO:2017-09-11 20:57:18.195[publishEvent] [事件发布] 数据创建, 编号:4
INFO:2017-09-11 20:57:18.196[annoEnvListener1] [注解事件监听][Order:1]数据创建, 编号:4
INFO:2017-09-11 20:57:21.198[annoEnvListener2] [注解事件监听][Order:2]数据创建, 编号:4